package com.dliyun.oap.framework.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.dliyun.oap.framework.SystemParameterNames;
import com.dliyun.oap.framework.ThreadFerry;
import com.dliyun.oap.framework.context.OapContext;
import com.dliyun.oap.framework.context.OapRequestContext;
import com.dliyun.oap.framework.context.RequestContextBuilder;
import com.dliyun.oap.framework.context.ServletRequestContextBuilder;
import com.dliyun.oap.framework.exception.OapException;
import com.dliyun.oap.framework.exception.TransportException;
import com.dliyun.oap.framework.impl.DefaultTransportSecurity;
import com.dliyun.oap.framework.impl.DumbThreadFerry;
import com.dliyun.oap.framework.interceptor.OapInterceptor;
import com.dliyun.oap.framework.request.OapRequest;
import com.dliyun.oap.framework.security.TransportSecurity;
import com.dliyun.oap.framework.response.*;
import com.dliyun.oap.framework.security.AppSecretManager;
import com.dliyun.oap.framework.security.SecurityManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import static org.springframework.http.HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS;
import static org.springframework.http.HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN;

/**
 * @author stjdydayou
 */
@Slf4j
public class RouterServiceImpl implements ApplicationContextAware, RouterService, InitializingBean {

    private final static String APP_KEY_HEADER = "X-APP-KEY";

    private final Marshaller jsonMarshaller = new FastJsonMarshaller();

    private final OapContext oapContext = new OapContext();

    private final RequestContextBuilder requestContextBuilder = new ServletRequestContextBuilder();

    private final ServiceMethodAdapter serviceMethodAdapter = new ServiceMethodAdapter();

    private ApplicationContext applicationContext;

    private Class<? extends ThreadFerry> threadFerryClass;

    private ThreadPoolExecutor threadPoolExecutor;

    @Autowired
    private TransportSecurity transportSecurity;

    @Autowired
    private SecurityManager securityManager;

    @Autowired
    private AppSecretManager appSecretManager;

    /**
     * 所有服务方法的最大过期时间，单位为秒(0或负数代表不限制)
     */
    private int timeoutSeconds = 60;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public void service(HttpServletRequest servletRequest, HttpServletResponse servletResponse) {
        String appKey = servletRequest.getHeader(APP_KEY_HEADER);
        OapResponse failResponse = null;

        try {
            if (!appSecretManager.isValidAppKey(appKey)) {
                failResponse = OapResponse.fail("INVALID_APP_KEY", "无效的appKey,请使用合法的appKey（由服务提供商授予你）");
                return;
            }

            //接收参数，与客户端参数的加密与解决都是在这里
            String streamString = getStreamString(servletRequest);
            if (StringUtils.isBlank(streamString)) {
                failResponse = OapResponse.fail("STREAM_STRING_EMPTY", "缺少必要的参数，请确认是否使用了application/json提交的数据");
                return;
            }

            Map<String, Object> requestParams = this.getRequestParams(appKey, streamString);

            // 获取服务方法最大过期时间
            String method = requestParams.getOrDefault(SystemParameterNames.METHOD, "").toString();
            String version = requestParams.getOrDefault(SystemParameterNames.VERSION, "").toString();
            if (log.isDebugEnabled()) {
                log.debug("调用服务方法：" + method + "(" + version + ")");
            }
            int serviceMethodTimeout = getServiceMethodTimeout(method, version);

            // 执行线程摆渡
            ThreadFerry threadFerry = buildThreadFerryInstance();
            threadFerry.doInSrcThread();

            ServiceRunnable runnable = new ServiceRunnable(servletRequest, servletResponse, requestParams, threadFerry, appKey);
            Future<?> future = this.threadPoolExecutor.submit(runnable);
            while (!future.isDone()) {
                future.get(serviceMethodTimeout, TimeUnit.SECONDS);
            }
        } catch (TransportException ex) {
            log.error("", ex);
            failResponse = OapResponse.fail("ENCRYPT_KEY_ERROR", "数据加密错误，请确认加密方式与密钥是否正确");
        } catch (RejectedExecutionException ex) {// 超过最大的服务平台的最大资源限制，无法提供服务
            log.error("超过最大资源限制，无法提供服务。", ex);
            failResponse = OapResponse.fail("REJECTED_REQUEST", "请求被禁止,禁止请求，因服务平台已经满负荷，请稍候再试");
        } catch (TimeoutException ex) {
            log.error("调用服务方法超时。", ex);
            failResponse = OapResponse.fail("SERVICE_TIMEOUT", "调用服务超时，请和服务平台提供商联系");
        } catch (Throwable throwable) {
            log.error("调用服务方法产生异常", throwable);
            failResponse = OapResponse.fail("SERVICE_UNAVAILABLE", String.format("调用后端服务异常。异常信息:%s", throwable));
        } finally {
            try {
                if (failResponse != null) {
                    writeResponse(appKey, failResponse, servletResponse);
                }
                servletResponse.getOutputStream().flush();
                servletResponse.getOutputStream().close();
            } catch (IOException e) {
                log.error("关闭响应出错", e);
            }
        }
    }

    @Override
    public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
        this.threadPoolExecutor = threadPoolExecutor;
    }


    @Override
    public void afterPropertiesSet() {
        Assert.notNull(this.applicationContext, "Spring上下文不能为空");


        if (log.isInfoEnabled()) {
            log.info("开始启动OAP框架...");
        }

        this.requestContextBuilder.registerConverters(this.applicationContext);


        // 设置异步执行器
        if (this.threadPoolExecutor == null) {
            this.threadPoolExecutor = new ThreadPoolExecutor(200, Integer.MAX_VALUE, 5 * 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() {
                private final AtomicInteger atomicInteger = new AtomicInteger();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "Default Thread Pool: " + atomicInteger.getAndIncrement());
                }
            });
        }

        // 设置针对网络传输过程的安全
        if (this.transportSecurity == null) {
            this.transportSecurity = new DefaultTransportSecurity();
        }

        //注册服务方法
        this.oapContext.registerServiceHandler(this.applicationContext);

        //注册拦截器
        this.oapContext.registerInterceptors(this.applicationContext);
    }

    @Override
    public void setThreadFerryClass(Class<? extends ThreadFerry> threadFerryClass) {
        this.threadFerryClass = threadFerryClass;
    }

    @Override
    public void setTimeoutSeconds(Integer timeoutSeconds) {
        if (log.isDebugEnabled()) {
            log.debug("serviceTimeoutSeconds set to {}", timeoutSeconds);
        }
        if (timeoutSeconds != null && timeoutSeconds > 0) {
            this.timeoutSeconds = timeoutSeconds;
        }
    }

    @Override
    public OapContext getOapContext() {
        return this.oapContext;
    }


    private ThreadFerry buildThreadFerryInstance() {
        if (threadFerryClass != null) {
            return BeanUtils.instantiateClass(threadFerryClass);
        } else {
            return new DumbThreadFerry();
        }
    }


    private String getStreamString(HttpServletRequest request) {
        StringBuilder sb = new StringBuilder();
        try {
            InputStream inputStream = request.getInputStream();
            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
            String line;
            while ((line = reader.readLine()) != null) {
                sb.append(line);
            }
            log.debug(sb.toString());
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
        return sb.toString();
    }

    private Map<String, Object> getRequestParams(String appKey, String streamString) throws TransportException {
        String s = transportSecurity.requestDecrypt(appKey, streamString);
        return JSON.parseObject(s, new TypeReference<Map<String, Object>>() {
        });
    }

    /**
     * 取最小的过期时间
     *
     * @param method
     * @param version
     * @return
     */
    private int getServiceMethodTimeout(String method, String version) {
        ServiceMethodHandler serviceMethodHandler = oapContext.getServiceMethodHandler(method, version);
        if (serviceMethodHandler == null) {
            return this.timeoutSeconds;
        } else {
            int methodTimeout = serviceMethodHandler.getServiceMethodDefinition().getTimeout();
            if (methodTimeout <= 0) {
                return this.timeoutSeconds;
            } else {
                return methodTimeout;
            }
        }
    }

    /**
     * 输入结果
     *
     * @param responseObj
     * @param response
     */
    private void writeResponse(String appKey, OapResponse responseObj, HttpServletResponse response) {
        try {
            response.addHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
            response.addHeader(ACCESS_CONTROL_ALLOW_METHODS, "*");
            response.setCharacterEncoding("UTF-8");
            response.setContentType("application/json");
            this.jsonMarshaller.marshaller(appKey, responseObj, response.getOutputStream(), this.transportSecurity);
        } catch (Exception e) {
            throw new OapException(e);
        }
    }

    /**
     * 在服务调用之前拦截
     *
     * @param arc
     */
    private void invokeBeforeServiceOfInterceptors(OapRequestContext arc, Map<String, Object> requestParams) {
        OapInterceptor tempInterceptor = null;
        try {
            List<OapInterceptor> interceptors = this.oapContext.getInterceptors();
            if (interceptors.size() > 0) {
                for (OapInterceptor interceptor : interceptors) {
                    tempInterceptor = interceptor;
                    tempInterceptor.beforeService(arc, requestParams);
                    //如果有一个产生了响应，则阻止后续的调用
                    if (arc.getOapResponse() != null) {
                        if (log.isDebugEnabled()) {
                            log.debug("拦截器[" + interceptor.getClass().getName() + "]产生了一个OapResponse, 阻止本次服务请求继续，服务将直接返回。");
                        }
                        return;
                    }
                }
            }
        } catch (Throwable e) {
            arc.setOapResponse(new ServiceUnavailableErrorResponse(e));
            if (tempInterceptor != null) {
                log.error("在执行拦截器[" + tempInterceptor.getClass().getName() + "]时发生异常.", e);
            }
        }
    }

    /**
     * 在服务调用之后，返回响应之前拦截
     *
     * @param arc
     */
    private void invokeBeforeResponseOfInterceptors(OapRequestContext arc, Map<String, Object> requestParams) {
        OapInterceptor tempInterceptor = null;
        try {
            List<OapInterceptor> interceptors = this.oapContext.getInterceptors();
            if (interceptors.size() > 0) {
                for (OapInterceptor interceptor : interceptors) {
                    tempInterceptor = interceptor;
                    tempInterceptor.beforeResponse(arc, requestParams);
                }
            }
        } catch (Throwable e) {
            arc.setOapResponse(new ServiceUnavailableErrorResponse(e));
            if (tempInterceptor != null) {
                log.error("在执行拦截器[" + tempInterceptor.getClass().getName() + "]时发生异常.", e);
            }
        }
    }


    private OapResponse doService(OapRequest oapRequest) {
        OapResponse response;
        OapRequestContext context = oapRequest.getRequestContext();
        try {
            if (context.getMethod() == null) {
                response = OapResponse.fail("MISSING_METHOD", String.format("服务请求缺少方法名参数:%s，缺少方法", SystemParameterNames.METHOD));
            } else if (oapContext.isValidMethod(context.getMethod())) {
                response = OapResponse.fail("INVALID_METHOD", "无效的方法，请检查服务方法名是否正确");
            } else {
                Object object = serviceMethodAdapter.invokeServiceMethod(oapRequest);
                if (!ClassUtils.isAssignable(OapResponse.class, object.getClass())) {
                    response = OapResponse.fail("INVALID_RESPONSE", "返回必须是" + OapResponse.class.getName());
                } else {
                    response = (OapResponse) object;
                }

            }
        } catch (Exception e) { // 出错则服务不可用的异常
            if (log.isInfoEnabled()) {
                log.info("调用" + context.getMethod() + "时发生异常，异常信息为：" + e.getMessage());
            }
            response = new ServiceUnavailableErrorResponse(e);
        }
        return response;
    }


    private class ServiceRunnable implements Runnable {

        private final HttpServletRequest servletRequest;
        private final HttpServletResponse servletResponse;
        private final ThreadFerry threadFerry;
        private final String appKey;
        private final Map<String, Object> requestParams;

        private ServiceRunnable(HttpServletRequest servletRequest, HttpServletResponse servletResponse, Map<String, Object> requestParams, ThreadFerry threadFerry, String appKey) {
            this.servletRequest = servletRequest;
            this.servletResponse = servletResponse;
            this.requestParams = requestParams;
            this.threadFerry = threadFerry;
            this.appKey = appKey;
        }

        @Override
        public void run() {
            OapRequestContext orc = null;
            OapResponse response = null;
            try {
                if (threadFerry != null) {
                    threadFerry.doInDestThread();
                }

                // 用系统级参数构造一个RequestContext实例（第一阶段绑定）
                orc = requestContextBuilder.buildBySysParams(oapContext, servletRequest, servletResponse, requestParams, this.appKey);

                // 验证系统级参数的合法性
                response = securityManager.validateSystemParameters(orc, oapContext);
                if (response != null) {
                    return;
                }
                // 绑定业务数据（第二阶段绑定）
                OapRequest oapRequest = requestContextBuilder.buildOapRequest(orc, requestParams);

                // 进行其它检查业务数据合法性，业务安全等
                response = securityManager.validateOther(orc, requestParams);
                if (response != null) {
                    return;
                }
                // 服务处理前拦截
                invokeBeforeServiceOfInterceptors(orc, requestParams);

                // 拦截器未生成response
                if (orc.getOapResponse() == null) {
                    // 如果拦截器没有产生oapResponse时才调用服务方法
                    response = doService(oapRequest);
                } else {
                    response = orc.getOapResponse();
                }
                orc.setOapResponse(response);
                // 输出响应
            } catch (Throwable e) {
                log.error("", e);
                response = new ServiceUnavailableErrorResponse(e);
                if (orc != null) {
                    orc.setOapResponse(response);
                }
            } finally {
                if (orc != null) {
                    // 发布服务完成事件
                    orc.setServiceEndTime(System.currentTimeMillis());
                    // 输出响应前拦截
                    invokeBeforeResponseOfInterceptors(orc, requestParams);

                    writeResponse(this.appKey, response, servletResponse);
                }
            }
        }
    }
}
